import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;


public class Hive_Read {


    public static void main(String[] args)
    {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //构建EnvironmentSettings 并指定Blink Planner
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

        //构建StreamTableEnvironment
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);

        String name            = "myhive";
        String defaultDatabase = "db1";
        String hiveConfDir     = "/home/appleyuchi/bigdata/apache-hive-3.1.2-bin/conf"; // a local path
        String version         = "3.1.2";

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);

        //todo 注册一个
        tEnv.registerCatalog("myhive", hive);

        tEnv.useCatalog("myhive");
        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        String  createDbSql = "SELECT  *  FROM  mytable ";


        Table table = tEnv.sqlQuery(createDbSql);

//        table.printSchema();
        DataStream<Tuple2<Boolean, Row>> demo = tEnv.toRetractStream(table, Row.class);
        demo.print("输出结果");
        try
        {
            env.execute();
        } catch (Exception e)
        {
            e.printStackTrace();
        }
    }

}
